package com.liruirui.springboot_exam.kafka;

import com.alibaba.fastjson.JSON;
import com.liruirui.springboot_exam.entity.EsRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.ui.Model;

@Slf4j
@Component
public class MovieConsumer {

    @Autowired
    EsRepository esRepository;

    @KafkaListener(topics = "topic1")
    public void onMessage(String msg, Acknowledgment acknowledgment) {
        log.info("这是msg:{}",msg);
        Model model = JSON.parseObject(msg, Model.class);
        if (model != null){
//            esRepository.save(model);
        }
        acknowledgment.acknowledge();
    }
}
